package org.totoro.lock.zookeeper;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Import;
import org.springframework.util.ObjectUtils;
import org.totoro.lock.DistributedLockSupport;
import org.totoro.lock.DistributedReentrantLockAbstract;
import org.totoro.lock.config.Constant;
import org.totoro.lock.config.LockZookeeperProperties;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author YHL
 * @version V1.0
 * @Description: 分布式锁 Zookeeper
 * @date 2018-12-04
 */
@Import(LockZookeeperProperties.class)
public class ZookeeperDistributedReentrantLock extends DistributedReentrantLockAbstract {

    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperDistributedReentrantLock.class);

    @Autowired
    @Qualifier(Constant.Zookeeper.LOCK_ZOOKEEPER_CLIENT_BEAN_NAME)
    private ZooKeeper zooKeeperClient;

    private DistributedLockSupport distributedLockSupport;


    @Override
    public void lock(String lockId) throws KeeperException, InterruptedException {

        Stat exists = exists(lockId);

        String lockKey = getLockKey(lockId);

        if (exists == null) {
            byte[] lockBean = distributedLockSupport.getLockBean();
            try {
                zooKeeperClient.create(lockKey, lockBean, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException.NodeExistsException e) {
                // 并发情况下，其他线程可以已经创建了，持久化节点
                LOGGER.debug("节点:{} 已经存在 ", lockId);

                boolean createTempNode = createTempNode(lockKey, lockBean);
                if (createTempNode == false) {
                    distributedLockSupport.lock();
                }
            }
        }

    }


    /**
     * 创建临时节点
     *
     * @param lockKey
     * @param lockBean
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    private boolean createTempNode(String lockKey, byte[] lockBean) throws KeeperException, InterruptedException {

        String ephemeralSequential = zooKeeperClient.create(lockKey + "/", lockBean,
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);

        distributedLockSupport.setEphemeralSequential(ephemeralSequential);

        return bindWatch(lockKey, lockBean, ephemeralSequential);
    }

    /**
     * 绑定 监听事件
     *
     * @param lockKey
     * @param lockBean
     * @param ephemeralSequential
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    private boolean bindWatch(String lockKey, byte[] lockBean, String ephemeralSequential) throws KeeperException, InterruptedException {

        List<String> children = zooKeeperClient.getChildren(lockKey, false);

        if (!ObjectUtils.isEmpty(children)) {
            List<String> collect = children.stream().sorted(Comparator.naturalOrder()).collect(Collectors.toList());
            String first = collect.get(0);
            // 当前节点是最小的，说明可以持有锁
            if (ephemeralSequential.compareTo(first) <= -1) {
                // 资源节点 设置为 持有锁 线程
                Stat stat = zooKeeperClient.exists(lockKey, false);
                zooKeeperClient.setData(lockKey, lockBean, stat.getVersion());
                return true;
            }

            String preKey = findPreKey(ephemeralSequential, collect);

            /**
             * 设置回调
             */
            Stat exists = zooKeeperClient.exists(lockKey + "/" + preKey, o -> {






            });

            // 1、服务器宕机导致节点不存在,重新查询前节点
            if (exists == null) {
                return createTempNode(lockKey, lockBean);
            }

            return false;
        }

        return false;
    }


    /**
     * 查找前面的key
     *
     * @param ephemeralSequential
     * @param collect
     * @return
     */
    private String findPreKey(String ephemeralSequential, List<String> collect) {
        String preKey = collect.get(0);

        for (String key : collect) {
            if (key.equals(ephemeralSequential)) {
                break;
            }
            preKey = key;
        }

        return preKey;
    }

    private Stat exists(String lockId) throws KeeperException, InterruptedException {
        return zooKeeperClient.exists(getLockKey(lockId), false);
    }

    private String getLockKey(String lockId) {
        return Constant.Zookeeper.DISTRIBUTED_LOCK_TREE_ROOT + lockId;
    }


    @Override
    public void lockInterruptibly(String lockId) throws InterruptedException {
        super.lockInterruptibly(lockId);
    }

    @Override
    public boolean tryLock(String lockId) {
        return super.tryLock(lockId);
    }

    @Override
    public boolean tryLock(String lockId, long time, TimeUnit unit) throws InterruptedException {
        return super.tryLock(lockId, time, unit);
    }

    @Override
    public void unlock(String lockId) {
        super.unlock(lockId);
    }

}
